Collections — low-level async API
For most use cases prefer the higher-level Batch API (submit_batch()) — it handles collection, run, jobs, and result fetching in one object with streaming results, progress tracking, and callbacks.
Use this page when you need manual control over the collection/run/job lifecycle (e.g. reusing a collection across multiple runs, custom polling, inspecting state).
For very large batches (1,000+ URLs) or when you need different parameters per URL, use collections with server-side execution.
Create and run
# Create a collection — use custom_id to map jobs to your data
col = client.create_collection("my-batch", [
{"url": "https://example.com/tour/1", "custom_id": "tour_1"},
{"url": "https://example.com/tour/2", "custom_id": "tour_2", "browser": True},
{"url": "https://example.com/tour/3", "custom_id": "tour_3", "format": "markdown"},
])
# Run and wait for completion
run = client.run_and_wait(col.id, timeout=300)
# Stream jobs lazily — each job has url + custom_id for traceability
for job in client.iter_run_jobs(col.id, run.run_id):
if job.status == "completed":
result = client.get_job_result(col.id, run.run_id, job.job_public_id)
save(my_id=job.custom_id, content=result.content)
URL traceability with custom_id
Each request can carry an arbitrary custom_id string that the API echoes back in JobExecutionPublic.custom_id and ScrapeResponse.custom_id. Use it to map results to your database without depending on order.
# Submit
col = client.create_collection("reviews-daily", [
{"url": tour["url"], "custom_id": tour["id"]} for tour in my_tours
])
# Map results
run = client.run_and_wait(col.id)
results_by_id = {}
for job in client.iter_run_jobs(col.id, run.run_id):
if job.status == "completed":
result = client.get_job_result(col.id, run.run_id, job.job_public_id)
results_by_id[job.custom_id] = result
You can submit the same URL with different custom_id values (deduped by (url, custom_id)) — useful for re-scraping with different processing pipelines.
Iterating jobs
Three ways depending on the size of your batch:
# 1. Stream lazily (recommended for >500 jobs)
for job in client.iter_run_jobs(col.id, run.run_id):
process(job)
# 2. Filter by status
for job in client.iter_run_jobs(col.id, run.run_id, status_filter="completed"):
process(job)
# 3. Fetch all into memory (default behavior, paginates internally)
jobs = client.get_run_jobs(col.id, run.run_id)
print(f"{len(jobs.items)} jobs total")
# 4. Manual pagination
page = client.get_run_jobs(col.id, run.run_id, cursor=None, limit=500)
while page.has_more:
for job in page.items:
process(job)
page = client.get_run_jobs(col.id, run.run_id, cursor=page.cursor_next, limit=500)
Rolling your own polling loop
If you're polling a still-running collection with a custom loop wrapped around client.iter_run_jobs() — typical pattern for code written before v0.7.x — there's an important caveat: the polling-efficiency optimisations added in v0.7.3 and v0.7.4 live in the high-level helpers, not at the wire layer. A custom loop bypasses them.
What you give up by keeping a custom polling loop:
- Adaptive
poll_interval(v0.7.3) — the SDK picks 5 / 10 / 15 / 30 s based on batch size so large runs don't burn the rate budget. You're hardcoding a value. - Counter short-circuit (v0.7.4) — skipping the jobs-page query when
run.success_requests + failed_requests + timeout_requestshasn't moved since the previous tick. Cuts ~50% of polling requests on long-running batches. - Resilience: transient 5xx / 429 retry, parallel result fetching, progress tracking.
Recommended: migrate to client.iter_results(cid, rid)
If your pattern is "iterate completed jobs, fetch each result, process", that's exactly what client.iter_results(cid, rid) does — and it carries every optimisation listed above. Practically a drop-in:
# Before — custom loop, no optimisations:
batch = client.get_batch(cid, rid)
while not batch.is_finished:
batch.refresh()
for job in client.iter_run_jobs(cid, rid, status_filter="completed"):
result = client.get_job_result(cid, rid, job.job_public_id)
process(result, job.custom_id)
time.sleep(5)
# After — high-level, optimisations baked in:
for result in client.iter_results(cid, rid):
process(result, result.custom_id)
Works for reattach too (the typical reason to be using get_batch + iter_run_jobs separately):
# After a crash / restart, persisted (cid, rid) → just iterate:
for result in client.iter_results(saved_cid, saved_rid):
process(result, result.custom_id)
Pass submitted_count=N if you persisted len(payload) and want batch.summary() to report a complete picture after the loop exits (see Batch API → Reattaching).
If you can't migrate: replicate the pattern manually
Legitimate reasons to keep the custom loop exist — integration with your own scheduler, fine-grained per-job retry logic, side effects on the polling cadence itself. The recipe below replicates the v0.7.4 optimisations in caller code:
import time
from scrapingpros import adaptive_poll_interval
last_terminal = -1 # sentinel — first tick always fetches
last_completed_at = None # high-water mark for incremental iter_run_jobs
while True:
run = client.get_run(cid, rid)
# Counter short-circuit: skip the jobs-page query when nothing
# has moved server-side since the previous tick.
current_terminal = (
(run.success_requests or 0)
+ (run.failed_requests or 0)
+ (run.timeout_requests or 0)
)
if current_terminal != last_terminal and run.all_jobs_persisted is not False:
for job in client.iter_run_jobs(
cid, rid,
status_filter=["completed", "failed", "timeout"], # v0.7.6: list/CSV
since_completed_at=last_completed_at,
):
if job.status == "completed":
result = client.get_job_result(cid, rid, job.job_public_id)
process(result, job.custom_id)
else:
process_failure(job)
if job.completed_at is not None:
last_completed_at = job.completed_at
last_terminal = current_terminal
if run.status in ("completed", "failed", "cancelled"):
break
# Adaptive cadence sized to the batch — small batches stay
# responsive, long batches don't saturate the rate limit.
time.sleep(adaptive_poll_interval(run.total_requests))
adaptive_poll_interval(n, kind="jobs") is exported from the top-level package since v0.7.3; kind="status" returns the tighter table for status-only polling. The recipe above uses "jobs" because each tick that fires the inner loop is jobs-page-heavy.
iter_run_jobs accepts since_completed_at= so the inner loop only paginates jobs that completed after the previous tick's high-water mark — that's how Batch.iter_results avoids re-reading every job on every tick. Since v0.7.6 status_filter also accepts a list or CSV string, so a single paginated stream drains all three terminal states (one request instead of three). The all_jobs_persisted guard (v0.7.6+) skips the inner drain while the server is still seeding jobs — Batch.iter_results does this automatically.
JobExecutionPublic fields
Every job in iter_run_jobs() / get_run_jobs() contains:
| Field | Description |
|---|---|
job_public_id | Unique job ID, used to fetch the result |
url | The URL that was scraped |
custom_id | Your traceability ID (echoed back from the request) |
status | "processing", "completed", "failed", "timeout" |
status_code | HTTP status from the target site |
is_success | Server verdict of whether the job produced usable content — True / False / None (legacy) |
queued_at, started_at, completed_at | Lifecycle timestamps (datetime) |
execution_time_ms | Total execution time in milliseconds |
retries_attempted | Internal retry count |
block_reason | Why the job was flagged as blocked (if any) |
protection_stack | Detected protections (e.g. ["cloudflare", "datadome"]) |
rule_hits | Validator rules that matched |
has_extractable_data | True / False / None — whether the page contained structured data (JSON-LD, microdata, OpenGraph, __NEXT_DATA__). Independent of is_success. (v0.5.0+) |
validator_version | Version of the HTML Validator that produced is_success and friends. Pin in tests to detect classifier upgrades. (v0.5.0+) |
client_id | Client account that owns the job. (v0.5.0+) |
url_truncated | True if the URL was longer than 2048 chars and got truncated |
Use is_success — don't re-implement the check
is_success is the server's authoritative verdict — the same one used to compute run.success_requests. Prefer it over writing your own check on status_code + body size: the server catches soft-blocks (Google CAPTCHA pages with 200 + large body, Amazon "Robot Check", etc.) that a simple heuristic misses.
for job in client.iter_run_jobs(col.id, run.run_id):
if job.is_success:
result = client.get_job_result(col.id, run.run_id, job.job_public_id)
save(result)
else:
log_failed(job.url, reason=job.block_reason or f"http_{job.status_code}")
The Batch.iter_results() API already honors this internally — result.guidance.success reflects job.is_success.
Success criterion (policy pinning)
Each run carries the classification policy as metadata. Pin the version in integration tests to catch silent policy changes:
run = client.get_run(col.id, run.run_id)
assert run.success_criterion.version == "content_success_v1"
The current policy content_success_v1 classifies a job as success when all of these hold:
status == "completed"200 <= status_code < 300potentiallyBlockedByCaptcha is falseblock_reason is null or "none"
If the policy ever changes, the version bumps (e.g. v2) and your pinned test fails loudly.
Webhooks
Get notified when a batch completes instead of polling:
col = client.create_collection(
"my-batch",
[{"url": "https://example.com/1"}, {"url": "https://example.com/2"}],
callback_url="https://your-server.com/webhook",
)
run = client.create_run(col.id)
# Your server receives a POST when done:
# {"event": "run.completed", "run_id": "...", "job_ids": [...]}
# Signed with HMAC-SHA256 in X-SP-Signature header
Check delivery status:
run = client.get_run(col.id, run.run_id)
print(run.callback_status) # "sent", "pending", "failed", "retrying"
Manual polling
import time
run = client.create_run(col.id)
while True:
run = client.get_run(col.id, run.run_id)
if run.status in ("completed", "failed", "cancelled"):
break
print(f"Progress: {run.success_requests}/{run.total_requests}")
time.sleep(5)
Collection management
# List all collections
collections = client.list_collections()
# Get a specific collection
col = client.get_collection("collection-id")
# Update a collection
client.update_collection("collection-id", "new-name", [
{"url": "https://example.com/updated"},
])
# Delete (runs are not affected)
client.delete_collection("collection-id")
Retention
- Job metadata (status, timings, custom_id, URL) — retained 90 days
- HTML / markdown / extracted_data — available for 48 hours after job completion
For longer-term archival, save the result on your side immediately after fetching.